67317cfa37bcb3d90fd7cc3213c04f740e97900d,cdap-app-fabric/src/main/java/co/cask/cdap/app/mapreduce/MapReduceMetricsInfo.java,MapReduceMetricsInfo,getJobCounters,#Map#Map#List#List#,135

Before Change


                getAggregates(mapTags, MapReduceMetrics.METRIC_INPUT_RECORDS));
    metrics.put(TaskCounter.MAP_OUTPUT_RECORDS.name(),
                getAggregates(mapTags, MapReduceMetrics.METRIC_OUTPUT_RECORDS));
    metrics.put(TaskCounter.MAP_OUTPUT_BYTES.name(),
                getAggregates(mapTags, MapReduceMetrics.METRIC_BYTES));

    metrics.put(TaskCounter.REDUCE_INPUT_RECORDS.name(),
                getAggregates(reduceTags, MapReduceMetrics.METRIC_INPUT_RECORDS));
    metrics.put(TaskCounter.REDUCE_OUTPUT_RECORDS.name(),
                getAggregates(reduceTags, MapReduceMetrics.METRIC_OUTPUT_RECORDS));

    float mapProgress = getAggregates(mapTags, MapReduceMetrics.METRIC_COMPLETION) / 100.0F;
    float reduceProgress = getAggregates(reduceTags, MapReduceMetrics.METRIC_COMPLETION) / 100.0F;

After Change


  }


  private MRJobInfo getJobCounters(Map<String, String> mapTags, Map<String, String> reduceTags,
                                   List<MRTaskInfo> mapTaskInfos, List<MRTaskInfo> reduceTaskInfos) throws Exception {
    HashMap<String, Long> metrics = Maps.newHashMap();

    Map<String, String> mapMetricsToCounters =
      ImmutableMap.of(prependSystem(MapReduceMetrics.METRIC_INPUT_RECORDS), TaskCounter.MAP_INPUT_RECORDS.name(),
                      prependSystem(MapReduceMetrics.METRIC_OUTPUT_RECORDS), TaskCounter.MAP_OUTPUT_RECORDS.name(),
                      prependSystem(MapReduceMetrics.METRIC_BYTES), TaskCounter.MAP_OUTPUT_BYTES.name(),
                      prependSystem(MapReduceMetrics.METRIC_COMPLETION), MapReduceMetrics.METRIC_COMPLETION);

    getAggregates(mapTags, mapMetricsToCounters, metrics);
    float mapProgress = metrics.remove(MapReduceMetrics.METRIC_COMPLETION) / 100.0F;

    Map<String, String> reduceMetricsToCounters =
      ImmutableMap.of(prependSystem(MapReduceMetrics.METRIC_INPUT_RECORDS), TaskCounter.REDUCE_INPUT_RECORDS.name(),
                      prependSystem(MapReduceMetrics.METRIC_OUTPUT_RECORDS), TaskCounter.REDUCE_OUTPUT_RECORDS.name(),
                      prependSystem(MapReduceMetrics.METRIC_COMPLETION), MapReduceMetrics.METRIC_COMPLETION);

    getAggregates(reduceTags, reduceMetricsToCounters, metrics);
    float reduceProgress = metrics.remove(MapReduceMetrics.METRIC_COMPLETION) / 100.0F;